/*-
 * Copyright (c) 2014-2020 MongoDB, Inc.
 * Copyright (c) 2008-2014 WiredTiger, Inc.
 *	All rights reserved.
 *
 * See the file LICENSE for redistribution information.
 */

#include "wt_internal.h"

/*
 * __async_get_key --
 *     WT_ASYNC_OP->get_key implementation for op handles.
 */
static int
__async_get_key(WT_ASYNC_OP *asyncop, ...)
{
    WT_DECL_RET;
    va_list ap;

    va_start(ap, asyncop);
    ret = __wt_cursor_get_keyv(&asyncop->c, asyncop->c.flags, ap);
    va_end(ap);
    return (ret);
}

/*
 * __async_set_key --
 *     WT_ASYNC_OP->set_key implementation for op handles.
 */
static void
__async_set_key(WT_ASYNC_OP *asyncop, ...)
{
    WT_CURSOR *c;
    va_list ap;

    c = &asyncop->c;
    va_start(ap, asyncop);
    WT_IGNORE_RET(__wt_cursor_set_keyv(c, c->flags, ap));
    if (!WT_DATA_IN_ITEM(&c->key) && !WT_CURSOR_RECNO(c))
        c->saved_err =
          __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->key, c->key.data, c->key.size);
    va_end(ap);
}

/*
 * __async_get_value --
 *     WT_ASYNC_OP->get_value implementation for op handles.
 */
static int
__async_get_value(WT_ASYNC_OP *asyncop, ...)
{
    WT_DECL_RET;
    va_list ap;

    va_start(ap, asyncop);
    ret = __wt_cursor_get_valuev(&asyncop->c, ap);
    va_end(ap);
    return (ret);
}

/*
 * __async_set_value --
 *     WT_ASYNC_OP->set_value implementation for op handles.
 */
static void
__async_set_value(WT_ASYNC_OP *asyncop, ...)
{
    WT_CURSOR *c;
    va_list ap;

    c = &asyncop->c;
    va_start(ap, asyncop);
    WT_IGNORE_RET(__wt_cursor_set_valuev(c, ap));
    /* Copy the data, if it is pointing at data elsewhere. */
    if (!WT_DATA_IN_ITEM(&c->value))
        c->saved_err =
          __wt_buf_set(O2S((WT_ASYNC_OP_IMPL *)asyncop), &c->value, c->value.data, c->value.size);
    va_end(ap);
}

/*
 * __async_op_wrap --
 *     Common wrapper for all async operations.
 */
static int
__async_op_wrap(WT_ASYNC_OP_IMPL *op, WT_ASYNC_OPTYPE type)
{
    op->optype = type;
    return (__wt_async_op_enqueue(O2S(op), op));
}

/*
 * __async_search --
 *     WT_ASYNC_OP->search implementation for op handles.
 */
static int
__async_search(WT_ASYNC_OP *asyncop)
{
    WT_ASYNC_OP_IMPL *op;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;

    op = (WT_ASYNC_OP_IMPL *)asyncop;
    ASYNCOP_API_CALL(O2C(op), session, search);
    WT_STAT_CONN_INCR(O2S(op), async_op_search);
    WT_ERR(__async_op_wrap(op, WT_AOP_SEARCH));
err:
    API_END_RET(session, ret);
}

/*
 * __async_insert --
 *     WT_ASYNC_OP->insert implementation for op handles.
 */
static int
__async_insert(WT_ASYNC_OP *asyncop)
{
    WT_ASYNC_OP_IMPL *op;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;

    op = (WT_ASYNC_OP_IMPL *)asyncop;
    ASYNCOP_API_CALL(O2C(op), session, insert);
    WT_STAT_CONN_INCR(O2S(op), async_op_insert);
    WT_ERR(__async_op_wrap(op, WT_AOP_INSERT));
err:
    API_END_RET(session, ret);
}

/*
 * __async_update --
 *     WT_ASYNC_OP->update implementation for op handles.
 */
static int
__async_update(WT_ASYNC_OP *asyncop)
{
    WT_ASYNC_OP_IMPL *op;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;

    op = (WT_ASYNC_OP_IMPL *)asyncop;
    ASYNCOP_API_CALL(O2C(op), session, update);
    WT_STAT_CONN_INCR(O2S(op), async_op_update);
    WT_ERR(__async_op_wrap(op, WT_AOP_UPDATE));
err:
    API_END_RET(session, ret);
}

/*
 * __async_remove --
 *     WT_ASYNC_OP->remove implementation for op handles.
 */
static int
__async_remove(WT_ASYNC_OP *asyncop)
{
    WT_ASYNC_OP_IMPL *op;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;

    op = (WT_ASYNC_OP_IMPL *)asyncop;
    ASYNCOP_API_CALL(O2C(op), session, remove);
    WT_STAT_CONN_INCR(O2S(op), async_op_remove);
    WT_ERR(__async_op_wrap(op, WT_AOP_REMOVE));
err:
    API_END_RET(session, ret);
}

/*
 * __async_compact --
 *     WT_ASYNC_OP->compact implementation for op handles.
 */
static int
__async_compact(WT_ASYNC_OP *asyncop)
{
    WT_ASYNC_OP_IMPL *op;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;

    op = (WT_ASYNC_OP_IMPL *)asyncop;
    ASYNCOP_API_CALL(O2C(op), session, compact);
    WT_STAT_CONN_INCR(O2S(op), async_op_compact);
    WT_ERR(__async_op_wrap(op, WT_AOP_COMPACT));
err:
    API_END_RET(session, ret);
}

/*
 * __async_get_id --
 *     WT_ASYNC_OP->get_id implementation for op handles.
 */
static uint64_t
__async_get_id(WT_ASYNC_OP *asyncop)
{
    return (((WT_ASYNC_OP_IMPL *)asyncop)->unique_id);
}

/*
 * __async_get_type --
 *     WT_ASYNC_OP->get_type implementation for op handles.
 */
static WT_ASYNC_OPTYPE
__async_get_type(WT_ASYNC_OP *asyncop)
{
    return (((WT_ASYNC_OP_IMPL *)asyncop)->optype);
}

/*
 * __async_op_init --
 *     Initialize all the op handle fields.
 */
static void
__async_op_init(WT_CONNECTION_IMPL *conn, WT_ASYNC_OP_IMPL *op, uint32_t id)
{
    WT_ASYNC_OP *asyncop;

    asyncop = (WT_ASYNC_OP *)op;
    asyncop->connection = (WT_CONNECTION *)conn;
    asyncop->key_format = asyncop->value_format = NULL;
    asyncop->c.key_format = asyncop->c.value_format = NULL;
    asyncop->get_key = __async_get_key;
    asyncop->get_value = __async_get_value;
    asyncop->set_key = __async_set_key;
    asyncop->set_value = __async_set_value;
    asyncop->search = __async_search;
    asyncop->insert = __async_insert;
    asyncop->update = __async_update;
    asyncop->remove = __async_remove;
    asyncop->compact = __async_compact;
    asyncop->get_id = __async_get_id;
    asyncop->get_type = __async_get_type;
    /*
     * The cursor needs to have the get/set key/value functions initialized. It also needs the
     * key/value related fields set up.
     */
    asyncop->c.get_key = __wt_cursor_get_key;
    asyncop->c.set_key = __wt_cursor_set_key;
    asyncop->c.get_value = __wt_cursor_get_value;
    asyncop->c.set_value = __wt_cursor_set_value;
    asyncop->c.recno = WT_RECNO_OOB;
    memset(asyncop->c.raw_recno_buf, 0, sizeof(asyncop->c.raw_recno_buf));
    memset(&asyncop->c.key, 0, sizeof(asyncop->c.key));
    memset(&asyncop->c.value, 0, sizeof(asyncop->c.value));
    asyncop->c.session = (WT_SESSION *)conn->default_session;
    asyncop->c.saved_err = 0;
    asyncop->c.flags = 0;

    op->internal_id = id;
    op->state = WT_ASYNCOP_FREE;
}

/*
 * __wt_async_op_enqueue --
 *     Enqueue an operation onto the work queue.
 */
int
__wt_async_op_enqueue(WT_SESSION_IMPL *session, WT_ASYNC_OP_IMPL *op)
{
    WT_ASYNC *async;
    WT_CONNECTION_IMPL *conn;
    uint64_t cur_head, cur_tail, my_alloc, my_slot;
#ifdef HAVE_DIAGNOSTIC
    WT_ASYNC_OP_IMPL *my_op;
#endif

    conn = S2C(session);
    async = conn->async;

    /*
     * If an application re-uses a WT_ASYNC_OP, we end up here with an invalid object.
     */
    if (op->state != WT_ASYNCOP_READY)
        WT_RET_MSG(session, EINVAL, "application error: WT_ASYNC_OP already in use");

    /*
     * Enqueue op at the tail of the work queue. We get our slot in the ring buffer to use.
     */
    my_alloc = __wt_atomic_add64(&async->alloc_head, 1);
    my_slot = my_alloc % async->async_qsize;

    /*
     * Make sure we haven't wrapped around the queue. If so, wait for the tail to advance off this
     * slot.
     */
    WT_ORDERED_READ(cur_tail, async->tail_slot);
    while (cur_tail == my_slot) {
        __wt_yield();
        WT_ORDERED_READ(cur_tail, async->tail_slot);
    }

#ifdef HAVE_DIAGNOSTIC
    WT_ORDERED_READ(my_op, async->async_queue[my_slot]);
    if (my_op != NULL)
        return (__wt_panic(session));
#endif
    WT_PUBLISH(async->async_queue[my_slot], op);
    op->state = WT_ASYNCOP_ENQUEUED;
    if (__wt_atomic_add32(&async->cur_queue, 1) > async->max_queue)
        WT_PUBLISH(async->max_queue, async->cur_queue);
    /*
     * Multiple threads may be adding ops to the queue. We need to wait our turn to make our slot
     * visible to workers.
     */
    WT_ORDERED_READ(cur_head, async->head);
    while (cur_head != (my_alloc - 1)) {
        __wt_yield();
        WT_ORDERED_READ(cur_head, async->head);
    }
    WT_PUBLISH(async->head, my_alloc);
    return (0);
}

/*
 * __wt_async_op_init --
 *     Initialize all the op handles.
 */
int
__wt_async_op_init(WT_SESSION_IMPL *session)
{
    WT_ASYNC *async;
    WT_ASYNC_OP_IMPL *op;
    WT_CONNECTION_IMPL *conn;
    WT_DECL_RET;
    uint32_t i;

    conn = S2C(session);
    async = conn->async;

    /*
     * Initialize the flush op structure.
     */
    __async_op_init(conn, &async->flush_op, OPS_INVALID_INDEX);

    /*
     * Allocate and initialize the work queue. This is sized so that the ring buffer is known to be
     * big enough such that the head can never overlap the tail. Include extra for the flush op.
     */
    async->async_qsize = conn->async_size + 2;
    WT_RET(__wt_calloc_def(session, async->async_qsize, &async->async_queue));
    /*
     * Allocate and initialize all the user ops.
     */
    WT_ERR(__wt_calloc_def(session, conn->async_size, &async->async_ops));
    for (i = 0; i < conn->async_size; i++) {
        op = &async->async_ops[i];
        __async_op_init(conn, op, i);
    }
    return (0);

err:
    __wt_free(session, async->async_ops);
    __wt_free(session, async->async_queue);
    return (ret);
}
